


/*
这里存放事件编排引擎运行期间的各种实时可观测性指标
Source
    初始化成功次数、失败次数
    收到的总的事件数、依据配置的分类统计数
Space
    进入该Space的总的事件数、依据配置的分类统计数、事件在Space的逗留时间
WorkNode
    进入该WorkNode的总的事件数、跟Space相同的分类统计数、事件在WorkNode的逗留时间
*/

use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{RwLock, RwLockReadGuard};

mod store;
mod mode_input;

pub use store::indicator_store_run;
pub use mode_input::{work_space_input_register, work_node_input_register, sink_input_register, restful_channel_input};

#[derive(Deserialize)]
pub struct IndicatorsStoreConf{
    enable: bool,
    file: String,
    interval: u64,//seconds
}

pub enum Op{
    Add,
    Sub,
}

#[derive(Eq,PartialOrd, PartialEq,Hash,Debug,Serialize,Deserialize,Clone)]
pub enum IndicatorsMode{
    Source,
    Sink,
    WorkSpace,
    WorkNode,
    PhysicalPlan,
    Channel,
    Timer,
}
#[derive(Serialize,Debug)]
pub struct IndicatorsKeyValue{
    pub key_value: HashMap<String, AtomicUsize>,
}
impl IndicatorsKeyValue{
    fn new()->Self{
        Self{
            key_value: HashMap::new(),
        }
    }
}

#[derive(Serialize,Debug)]
pub struct Indicators{
    pub mode: HashMap<String, IndicatorsKeyValue>,
}

impl Indicators{
    fn new()->Self{
        Self{
            mode: HashMap::new(),
        }
    }
}

lazy_static!{
    #[derive(Debug)]
    pub static ref INDICATORS_VALUES: RwLock<HashMap<IndicatorsMode, Indicators>> = {
        RwLock::new(HashMap::new())
    };
}

pub fn indicators_values_ref()->RwLockReadGuard<'static, HashMap<IndicatorsMode, Indicators>>{
    if let Ok(t) = INDICATORS_VALUES.read(){
        t
    }else{
        error!("INDICATORS_VALUES read err");
        std::process::exit(-1);
    }
}

pub fn indicators_key_ref(op: Op, mode: IndicatorsMode, name: &str, key: &str, value: usize){

    if let Ok(indicators_values) = INDICATORS_VALUES.read(){
        if let Some(t) = indicators_values.get(&mode){
            if let Some(t) = t.mode.get(name){
                if let Some(t) = t.key_value.get(key){
                    match op{
                        Op::Add => {t.fetch_add(value, Ordering::SeqCst);},
                        Op::Sub => {t.fetch_sub(value, Ordering::SeqCst);},
                    }
                }else{
                    drop(indicators_values);
                    indicators_new(op, mode, name, key, value);
                }
            }else{
                drop(indicators_values);
                indicators_new(op, mode, name, key, value);
            }
        }else{
            drop(indicators_values);
            indicators_new(op, mode, name, key, value);
        }
    }else{
        error!("INDICATORS_VALUES write err");
        std::process::exit(-1);
    }
}

fn indicators_new(op: Op,mode: IndicatorsMode,name: &str,key: &str,value: usize){

    if let Ok(mut indicators_values) = INDICATORS_VALUES.write(){
        if let Some(t) = indicators_values.get_mut(&mode){
            if let Some(t) = t.mode.get_mut(name){
                if let Some(t) = t.key_value.get_mut(key){
                    match op{
                        Op::Add => {t.fetch_add(value, Ordering::SeqCst);},
                        Op::Sub => {t.fetch_sub(value, Ordering::SeqCst);},
                    }
                }else{
                    t.key_value.insert(key.to_string(), AtomicUsize::new(value));
                }
            }else{
                t.mode.insert(name.to_string(), IndicatorsKeyValue::new());
                drop(indicators_values);
                indicators_key_ref(op, mode, name, key, value);
            }
        }else{
            indicators_values.insert(mode.clone(), Indicators::new());
            drop(indicators_values);
            indicators_key_ref(op, mode, name, key, value);
        }
    }else{
        error!("INDICATORS_VALUES write err");
        std::process::exit(-1);
    }
}

pub fn restful_indicators()->String{
    let mut tmp = HashMap::new();

    if let Ok(indicators_values) = INDICATORS_VALUES.read(){
        for (k,v) in indicators_values.iter(){
            let mut tmp_mode = HashMap::new();

            for (kk, vv) in v.mode.iter(){
                let mut tmp_kv = HashMap::new();

                for (kkk,vvv) in vv.key_value.iter(){
                    tmp_kv.insert(kkk.to_string(), vvv.load(Ordering::SeqCst));
                }

                tmp_mode.insert(kk.to_string(), tmp_kv);
            }

            tmp.insert(k.clone(), tmp_mode);
        }
    }else{
        error!("INDICATORS_VALUES read err");
        std::process::exit(-1);
    }

    match serde_json::to_string_pretty(&tmp){
        Ok(t) => {t},
        Err(e) => {
            format!("serde_json err {:?}", e)
        },
    }
}